ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [36/50] [abbrv] incubator-ignite git commit: # ignite-6 merge from sprint-1
Date Tue, 03 Feb 2015 12:32:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index b73b0d1,dccb2c3..0edaffe
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@@ -26,17 -26,10 +26,18 @@@ import org.apache.ignite.cluster.*
  import org.apache.ignite.configuration.*;
  import org.apache.ignite.fs.*;
  import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.datastructures.*;
 +import org.apache.ignite.internal.processors.cache.version.*;
 +import org.apache.ignite.internal.util.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.mxbean.*;
 +import org.apache.ignite.plugin.security.*;
 +import org.apache.ignite.portables.*;
 +import org.apache.ignite.resources.*;
 +import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.compute.*;
  import org.apache.ignite.internal.processors.cache.affinity.*;
 -import org.apache.ignite.internal.processors.cache.datastructures.*;
  import org.apache.ignite.internal.processors.cache.distributed.dht.*;
  import org.apache.ignite.internal.processors.cache.dr.*;
  import org.apache.ignite.internal.processors.cache.query.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 797cfe3,b1abc8e..b5c91a3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -30,12 -30,6 +30,13 @@@ import org.apache.ignite.fs.*
  import org.apache.ignite.internal.*;
  import org.apache.ignite.internal.processors.*;
  import org.apache.ignite.internal.processors.cache.datastructures.*;
 +import org.apache.ignite.internal.processors.datastructures.*;
 +import org.apache.ignite.internal.processors.cache.version.*;
 +import org.apache.ignite.internal.util.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.lifecycle.LifecycleAware;
 +import org.apache.ignite.spi.*;
++import org.apache.ignite.internal.processors.cache.datastructures.*;
  import org.apache.ignite.internal.processors.cache.distributed.dht.*;
  import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
  import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 14b2673,b20628e..52c6c32
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@@ -23,15 -23,18 +23,17 @@@ import org.apache.ignite.cache.affinity
  import org.apache.ignite.cache.datastructures.*;
  import org.apache.ignite.cache.query.*;
  import org.apache.ignite.cluster.*;
- import org.apache.ignite.internal.processors.cache.version.*;
- import org.apache.ignite.lang.*;
- import org.apache.ignite.mxbean.*;
- import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.*;
  import org.apache.ignite.internal.processors.cache.affinity.*;
 -import org.apache.ignite.internal.processors.cache.datastructures.*;
  import org.apache.ignite.internal.processors.cache.dr.*;
  import org.apache.ignite.internal.processors.cache.query.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
  import org.apache.ignite.internal.util.tostring.*;
  import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.mxbean.*;
+ import org.apache.ignite.transactions.*;
  import org.jetbrains.annotations.*;
  
  import javax.cache.expiry.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index a2bc582,0000000..131fa3c
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@@ -1,256 -1,0 +1,255 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastructures;
 +
 +import org.apache.ignite.*;
- import org.apache.ignite.cache.*;
 +import org.apache.ignite.internal.processors.cache.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.jetbrains.annotations.*;
 +
 +import javax.cache.processor.*;
 +import java.util.*;
 +
 +/**
 + * {@link org.apache.ignite.IgniteQueue} implementation using atomic cache.
 + */
 +public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
 +    /** */
 +    private static final long RETRY_TIMEOUT = 3000;
 +
 +    /**
 +     * @param queueName Queue name.
 +     * @param hdr Queue header.
 +     * @param cctx Cache context.
 +     */
 +    public GridAtomicCacheQueueImpl(String queueName, GridCacheQueueHeader hdr, GridCacheContext<?, ?> cctx) {
 +        super(queueName, hdr, cctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public boolean offer(T item) throws IgniteException {
 +        try {
 +            Long idx = transformHeader(new AddProcessor(id, 1));
 +
 +            if (idx == null)
 +                return false;
 +
 +            checkRemoved(idx);
 +
 +            int cnt = 0;
 +
 +            GridCacheQueueItemKey key = itemKey(idx);
 +
 +            while (true) {
 +                try {
 +                    cache.put(key, item);
 +
 +                    break;
 +                }
-                 catch (CachePartialUpdateException e) {
++                catch (CachePartialUpdateCheckedException e) {
 +                    if (cnt++ == MAX_UPDATE_RETRIES)
 +                        throw e;
 +                    else {
 +                        U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']');
 +
 +                        U.sleep(RETRY_DELAY);
 +                    }
 +                }
 +            }
 +
 +            return true;
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IgniteException(e);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Nullable @Override public T poll() throws IgniteException {
 +        try {
 +            while (true) {
 +                Long idx = transformHeader(new PollProcessor(id));
 +
 +                if (idx == null)
 +                    return null;
 +
 +                checkRemoved(idx);
 +
 +                GridCacheQueueItemKey key = itemKey(idx);
 +
 +                int cnt = 0;
 +
 +                long stop = 0;
 +
 +                while (true) {
 +                    try {
-                         T data = (T)cache.getAndRemove(key);
++                        T data = (T)cache.remove(key, null);
 +
 +                        if (data != null)
 +                            return data;
 +
 +                        if (stop == 0)
 +                            stop = U.currentTimeMillis() + RETRY_TIMEOUT;
 +
 +                        while (U.currentTimeMillis() < stop ) {
-                             data = (T)cache.getAndRemove(key);
++                            data = (T)cache.remove(key, null);
 +
 +                            if (data != null)
 +                                return data;
 +                        }
 +
 +                        break;
 +                    }
-                     catch (CachePartialUpdateException e) {
++                    catch (CachePartialUpdateCheckedException e) {
 +                        if (cnt++ == MAX_UPDATE_RETRIES)
 +                            throw e;
 +                        else {
 +                            U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']');
 +
 +                            U.sleep(RETRY_DELAY);
 +                        }
 +                    }
 +                }
 +
 +                U.warn(log, "Failed to get item, will retry poll [queue=" + queueName + ", idx=" + idx + ']');
 +            }
 +        }
 +        catch (IgniteCheckedException e) {
-             throw new IgniteException(e);
++            throw U.convertException(e);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public boolean addAll(Collection<? extends T> items) {
 +        A.notNull(items, "items");
 +
 +        try {
 +            Long idx = transformHeader(new AddProcessor(id, items.size()));
 +
 +            if (idx == null)
 +                return false;
 +
 +            checkRemoved(idx);
 +
 +            Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
 +
 +            for (T item : items) {
 +                putMap.put(itemKey(idx), item);
 +
 +                idx++;
 +            }
 +
 +            int cnt = 0;
 +
 +            while (true) {
 +                try {
-                     cache.putAll(putMap);
++                    cache.putAll(putMap, null);
 +
 +                    break;
 +                }
-                 catch (CachePartialUpdateException e) {
++                catch (CachePartialUpdateCheckedException e) {
 +                    if (cnt++ == MAX_UPDATE_RETRIES)
 +                        throw e;
 +                    else {
 +                        U.warn(log, "Failed to add items, will retry [err=" + e + ']');
 +
 +                        U.sleep(RETRY_DELAY);
 +                    }
 +                }
 +            }
 +
 +            return true;
 +        }
 +        catch (IgniteCheckedException e) {
-             throw new IgniteException(e);
++            throw U.convertException(e);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override protected void removeItem(long rmvIdx) throws IgniteCheckedException {
-         Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx));
++        Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
 +
 +        if (idx != null) {
 +            checkRemoved(idx);
 +
 +            GridCacheQueueItemKey key = itemKey(idx);
 +
 +            int cnt = 0;
 +
 +            long stop = 0;
 +
 +            while (true) {
 +                try {
-                     if (cache.remove(key))
++                    if (cache.removex(key, null))
 +                        return;
 +
 +                    if (stop == 0)
 +                        stop = U.currentTimeMillis() + RETRY_TIMEOUT;
 +
 +                    while (U.currentTimeMillis() < stop ) {
-                         if (cache.remove(key))
++                        if (cache.removex(key, null))
 +                            return;
 +                    }
 +
 +                    break;
 +                }
-                 catch (CachePartialUpdateException e) {
++                catch (CachePartialUpdateCheckedException e) {
 +                    if (cnt++ == MAX_UPDATE_RETRIES)
 +                        throw e;
 +                    else {
 +                        U.warn(log, "Failed to add items, will retry [err=" + e + ']');
 +
 +                        U.sleep(RETRY_DELAY);
 +                    }
 +                }
 +            }
 +
 +            U.warn(log, "Failed to remove item, [queue=" + queueName + ", idx=" + idx + ']');
 +        }
 +    }
 +
 +    /**
 +     * @param c EntryProcessor to be applied for queue header.
 +     * @return Value computed by the entry processor.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    @SuppressWarnings("unchecked")
 +    @Nullable private Long transformHeader(EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long> c)
 +        throws IgniteCheckedException {
 +        int cnt = 0;
 +
 +        while (true) {
 +            try {
-                 return (Long)cache.invoke(queueKey, c);
++                return (Long)cache.invoke(queueKey, c).get();
 +            }
-             catch (CachePartialUpdateException e) {
++            catch (CachePartialUpdateCheckedException e) {
 +                if (cnt++ == MAX_UPDATE_RETRIES)
 +                    throw e;
 +                else {
 +                    U.warn(log, "Failed to update queue header, will retry [err=" + e + ']');
 +
 +                    U.sleep(RETRY_DELAY);
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAnnotationHelper.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAnnotationHelper.java
index 13d09fc,0000000..7da8aa7
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAnnotationHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAnnotationHelper.java
@@@ -1,240 -1,0 +1,240 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastructures;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.util.*;
- import org.apache.ignite.lang.*;
 +import org.apache.ignite.internal.util.typedef.*;
++import org.apache.ignite.lang.*;
 +
 +import java.lang.annotation.*;
 +import java.lang.reflect.*;
 +import java.util.*;
 +import java.util.concurrent.*;
 +
 +/**
 + * Utility class for getting annotated values from classes.
 + * Contains local cache of annotated methods and fields by classes for best performance.
 + */
 +public class GridCacheAnnotationHelper<A extends Annotation> {
 +    /** Number of entries to keep in annotation cache. */
 +    private static final int DFLT_CLASS_CACHE_SIZE = 1000;
 +
 +    /** Field cache. */
 +    private final GridBoundedLinkedHashMap<Class<?>, List<Field>> fieldCache;
 +
 +    /** Method cache. */
 +    private final GridBoundedLinkedHashMap<Class<?>, List<Method>> mtdCache;
 +
 +    /** Annotation class. */
 +    private final Class<A> annCls;
 +
 +    /** Mutex. */
 +    private final Object mux = new Object();
 +
 +    /**
 +     * Constructor.
 +     *
 +     * @param annCls Annotation class.
 +     */
 +    public GridCacheAnnotationHelper(Class<A> annCls) {
 +        this(annCls, DFLT_CLASS_CACHE_SIZE);
 +    }
 +
 +    /**
 +     * Constructor.
 +     *
 +     * @param annCls Annotation class.
 +     * @param capacity Capacity of local caches.
 +     */
 +    public GridCacheAnnotationHelper(Class<A> annCls, int capacity) {
 +        assert annCls != null : "Annotated class mustn't be null.";
 +        assert capacity > 0 : "Capacity must be more then zero.";
 +
 +        this.annCls = annCls;
 +
 +        fieldCache = new GridBoundedLinkedHashMap<>(capacity);
 +
 +        mtdCache = new GridBoundedLinkedHashMap<>(capacity);
 +    }
 +
 +    /**
 +     * Returns annotated value.
 +     *
 +     * @param target Object to find a value in.
 +     * @return Value of annotated field or method.
 +     * @throws IgniteCheckedException If failed to find.
 +     */
 +    public Object annotatedValue(Object target) throws IgniteCheckedException {
 +        IgniteBiTuple<Object, Boolean> res = annotatedValue(target, new HashSet<>(), false);
 +
 +        assert res != null;
 +
 +        return res.get1();
 +    }
 +
 +    /**
 +     * Returns annotated value.
 +     *
 +     * @param target Object to find a value in.
 +     * @param visited Set of visited objects to avoid cycling.
 +     * @param annFound Flag indicating if value has already been found.
 +     * @return Value of annotated field or method.
 +     * @throws IgniteCheckedException If failed to find.
 +     */
 +    private IgniteBiTuple<Object, Boolean> annotatedValue(Object target, Set<Object> visited, boolean annFound)
 +        throws IgniteCheckedException {
 +        assert target != null;
 +
 +        // To avoid infinite recursion.
 +        if (visited.contains(target))
 +            return F.t(null, annFound);
 +
 +        visited.add(target);
 +
 +        Object val = null;
 +
 +        for (Class<?> cls = target.getClass(); !cls.equals(Object.class); cls = cls.getSuperclass()) {
 +            // Fields.
 +            for (Field f : fieldsWithAnnotation(cls)) {
 +                f.setAccessible(true);
 +
 +                Object fieldVal;
 +
 +                try {
 +                    fieldVal = f.get(target);
 +                }
 +                catch (IllegalAccessException e) {
 +                    throw new IgniteCheckedException("Failed to get annotated field value [cls=" + cls.getName() +
 +                        ", ann=" + annCls.getSimpleName()+']', e);
 +                }
 +
 +                if (needsRecursion(f)) {
 +                    if (fieldVal != null) {
 +                        // Recursion.
 +                        IgniteBiTuple<Object, Boolean> tup = annotatedValue(fieldVal, visited, annFound);
 +
 +                        if (!annFound && tup.get2())
 +                            // Update value only if annotation was found in recursive call.
 +                            val = tup.get1();
 +
 +                        annFound = tup.get2();
 +                    }
 +                }
 +                else {
 +                    if (annFound)
 +                        throw new IgniteCheckedException("Multiple annotations has been found [cls=" + cls.getName() +
 +                            ", ann=" + annCls.getSimpleName() + ']');
 +
 +                    val = fieldVal;
 +
 +                    annFound = true;
 +                }
 +            }
 +
 +            // Methods.
 +            for (Method m : methodsWithAnnotation(cls)) {
 +                if (annFound)
 +                    throw new IgniteCheckedException("Multiple annotations has been found [cls=" + cls.getName() +
 +                        ", ann=" + annCls.getSimpleName() + ']');
 +
 +                m.setAccessible(true);
 +
 +                try {
 +                    val = m.invoke(target);
 +                }
 +                catch (Exception e) {
 +                    throw new IgniteCheckedException("Failed to get annotated method value [cls=" + cls.getName() +
 +                        ", ann=" + annCls.getSimpleName()+']', e);
 +                }
 +
 +                annFound = true;
 +            }
 +        }
 +
 +        return F.t(val, annFound);
 +    }
 +
 +    /**
 +     * @param f Field.
 +     * @return {@code true} if recursive inspection is required.
 +     */
 +    private boolean needsRecursion(Field f) {
 +        assert f != null;
 +
 +        // Need to inspect anonymous classes, callable and runnable instances.
 +        return f.getName().startsWith("this$") || f.getName().startsWith("val$") ||
 +            Callable.class.isAssignableFrom(f.getType()) || Runnable.class.isAssignableFrom(f.getType());
 +    }
 +
 +    /**
 +     * Gets all entries from the specified class or its super-classes that have
 +     * been annotated with annotation provided.
 +     *
 +     * @param cls Class in which search for fields.
 +     * @return Set of entries with given annotations.
 +     */
 +    private Iterable<Field> fieldsWithAnnotation(Class<?> cls) {
 +        synchronized (mux) {
 +            List<Field> fields = fieldCache.get(cls);
 +            if (fields == null) {
 +                fields = new ArrayList<>();
 +
 +                for (Field field : cls.getDeclaredFields()) {
 +                    Annotation ann = field.getAnnotation(annCls);
 +
 +                    if (ann != null || needsRecursion(field))
 +                        fields.add(field);
 +                }
 +
 +                if (!fields.isEmpty())
 +                    fieldCache.put(cls, fields);
 +            }
 +
 +            return fields;
 +        }
 +    }
 +
 +    /**
 +     * Gets set of methods with given annotation.
 +     *
 +     * @param cls Class in which search for methods.
 +     * @return Set of methods with given annotations.
 +     */
 +    private Iterable<Method> methodsWithAnnotation(Class<?> cls) {
 +        synchronized (mux) {
 +            List<Method> mtds = mtdCache.get(cls);
 +
 +            if (mtds == null) {
 +                mtds = new ArrayList<>();
 +
 +                for (Method mtd : cls.getDeclaredMethods()) {
 +                    Annotation ann = mtd.getAnnotation(annCls);
 +
 +                    if (ann != null)
 +                        mtds.add(mtd);
 +                }
 +
 +                if (!mtds.isEmpty())
 +                    mtdCache.put(cls, mtds);
 +            }
 +
 +            return mtds;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index 9bb71b2,0000000..10e8e6b
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@@ -1,510 -1,0 +1,510 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastructures;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.datastructures.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
- import org.apache.ignite.lang.*;
- import org.apache.ignite.transactions.*;
++import org.apache.ignite.internal.processors.cache.transactions.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
++import org.apache.ignite.lang.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.io.*;
 +import java.util.concurrent.*;
 +
 +import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
 +import static org.apache.ignite.transactions.IgniteTxIsolation.*;
 +
 +/**
 + * Cache atomic long implementation.
 + */
 +public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Externalizable {
 +    /** */
 +    private static final long serialVersionUID = 0L;
 +
 +    /** Deserialization stash. */
 +    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
 +        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
 +            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
 +                return F.t2();
 +            }
 +        };
 +
 +    /** Logger. */
 +    private IgniteLogger log;
 +
 +    /** Atomic long name. */
 +    private String name;
 +
 +    /** Removed flag.*/
 +    private volatile boolean rmvd;
 +
 +    /** Atomic long key. */
 +    private GridCacheInternalKey key;
 +
 +    /** Atomic long projection. */
 +    private CacheProjection<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView;
 +
 +    /** Cache context. */
 +    private GridCacheContext ctx;
 +
 +    /** Callable for {@link #get()}. */
 +    private final Callable<Long> getCall = new Callable<Long>() {
 +        @Override public Long call() throws Exception {
 +            GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +            if (val == null)
 +                throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +            return val.get();
 +        }
 +    };
 +
 +    /** Callable for {@link #incrementAndGet()}. */
 +    private final Callable<Long> incAndGetCall = new Callable<Long>() {
 +        @Override public Long call() throws Exception {
-             try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +                if (val == null)
 +                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +                long retVal = val.get() + 1;
 +
 +                val.set(retVal);
 +
 +                atomicView.put(key, val);
 +
 +                tx.commit();
 +
 +                return retVal;
 +            }
 +            catch (Error | Exception e) {
 +                U.error(log, "Failed to increment and get: " + this, e);
 +
 +                throw e;
 +            }
 +        }
 +    };
 +
 +    /** Callable for {@link #getAndIncrement()}. */
 +    private final Callable<Long> getAndIncCall = new Callable<Long>() {
 +        @Override public Long call() throws Exception {
-             try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +                if (val == null)
 +                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +                long retVal = val.get();
 +
 +                val.set(retVal + 1);
 +
 +                atomicView.put(key, val);
 +
 +                tx.commit();
 +
 +                return retVal;
 +            }
 +            catch (Error | Exception e) {
 +                U.error(log, "Failed to get and increment: " + this, e);
 +
 +                throw e;
 +            }
 +        }
 +    };
 +
 +    /** Callable for {@link #decrementAndGet()}. */
 +    private final Callable<Long> decAndGetCall = new Callable<Long>() {
 +        @Override public Long call() throws Exception {
-             try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +                if (val == null)
 +                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +                long retVal = val.get() - 1;
 +
 +                val.set(retVal);
 +
 +                atomicView.put(key, val);
 +
 +                tx.commit();
 +
 +                return retVal;
 +            }
 +            catch (Error | Exception e) {
 +                U.error(log, "Failed to decrement and get: " + this, e);
 +
 +                throw e;
 +            }
 +        }
 +    };
 +
 +    /** Callable for {@link #getAndDecrement()}. */
 +    private final Callable<Long> getAndDecCall = new Callable<Long>() {
 +        @Override public Long call() throws Exception {
-             try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +                if (val == null)
 +                    throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +                long retVal = val.get();
 +
 +                val.set(retVal - 1);
 +
 +                atomicView.put(key, val);
 +
 +                tx.commit();
 +
 +                return retVal;
 +            }
 +            catch (Error | Exception e) {
 +                U.error(log, "Failed to get and decrement and get: " + this, e);
 +
 +                throw e;
 +            }
 +        }
 +    };
 +
 +    /**
 +     * Empty constructor required by {@link Externalizable}.
 +     */
 +    public GridCacheAtomicLongImpl() {
 +        // No-op.
 +    }
 +
 +    /**
 +     * Default constructor.
 +     *
 +     * @param name Atomic long name.
 +     * @param key Atomic long key.
 +     * @param atomicView Atomic projection.
 +     * @param ctx CacheContext.
 +     */
 +    public GridCacheAtomicLongImpl(String name, GridCacheInternalKey key,
 +        CacheProjection<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView, GridCacheContext ctx) {
 +        assert key != null;
 +        assert atomicView != null;
 +        assert ctx != null;
 +        assert name != null;
 +
 +        this.ctx = ctx;
 +        this.key = key;
 +        this.atomicView = atomicView;
 +        this.name = name;
 +
 +        log = ctx.gridConfig().getGridLogger().getLogger(getClass());
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String name() {
 +        return name;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long get() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(getCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long incrementAndGet() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(incAndGetCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long getAndIncrement() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(getAndIncCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long addAndGet(long l) throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(internalAddAndGet(l), ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long getAndAdd(long l) throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(internalGetAndAdd(l), ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long decrementAndGet() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(decAndGetCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long getAndDecrement() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(getAndDecCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long getAndSet(long l) throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(internalGetAndSet(l), ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean compareAndSet(long expVal, long newVal)
 +        throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(internalCompareAndSet(expVal, newVal), ctx);
 +    }
 +
 +    /**
 +     * Check removed flag.
 +     *
 +     * @throws IgniteCheckedException If removed.
 +     */
 +    private void checkRemoved() throws IgniteCheckedException {
 +        if (rmvd)
 +            throw new DataStructureRemovedException("Atomic long was removed from cache: " + name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean onRemoved() {
 +        return rmvd = true;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onInvalid(@Nullable Exception err) {
 +        // No-op.
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridCacheInternalKey key() {
 +        return key;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean removed() {
 +        return rmvd;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void close() {
 +        if (rmvd)
 +            return;
 +
 +        try {
 +            ctx.kernalContext().dataStructures().removeAtomicLong(name);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IgniteException(e);
 +        }
 +    }
 +
 +    /**
 +     * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode.
 +     *
 +     * @param l Value will be added to atomic long.
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    private Callable<Long> internalAddAndGet(final long l) {
 +        return new Callable<Long>() {
 +            @Override public Long call() throws Exception {
-                 try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                    GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +                    if (val == null)
 +                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +                    long retVal = val.get() + l;
 +
 +                    val.set(retVal);
 +
 +                    atomicView.put(key, val);
 +
 +                    tx.commit();
 +
 +                    return retVal;
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to add and get: " + this, e);
 +
 +                    throw e;
 +                }
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Method returns callable for execution {@link #getAndAdd(long)} operation in async and sync mode.
 +     *
 +     * @param l Value will be added to atomic long.
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    private Callable<Long> internalGetAndAdd(final long l) {
 +        return new Callable<Long>() {
 +            @Override public Long call() throws Exception {
-                 try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                    GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +                    if (val == null)
 +                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +                    long retVal = val.get();
 +
 +                    val.set(retVal + l);
 +
 +                    atomicView.put(key, val);
 +
 +                    tx.commit();
 +
 +                    return retVal;
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to get and add: " + this, e);
 +
 +                    throw e;
 +                }
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Method returns callable for execution {@link #getAndSet(long)} operation in async and sync mode.
 +     *
 +     * @param l Value will be added to atomic long.
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    private Callable<Long> internalGetAndSet(final long l) {
 +        return new Callable<Long>() {
 +            @Override public Long call() throws Exception {
-                 try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                    GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +                    if (val == null)
 +                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +                    long retVal = val.get();
 +
 +                    val.set(l);
 +
 +                    atomicView.put(key, val);
 +
 +                    tx.commit();
 +
 +                    return retVal;
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to get and set: " + this, e);
 +
 +                    throw e;
 +                }
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Method returns callable for execution {@link #compareAndSet(long, long)}
 +     * operation in async and sync mode.
 +     *
 +     * @param expVal Expected atomic long value.
 +     * @param newVal New atomic long value.
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    private Callable<Boolean> internalCompareAndSet(final long expVal, final long newVal) {
 +        return new Callable<Boolean>() {
 +            @Override public Boolean call() throws Exception {
-                 try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                    GridCacheAtomicLongValue val = atomicView.get(key);
 +
 +                    if (val == null)
 +                        throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
 +
 +                    boolean retVal = val.get() == expVal;
 +
 +                    if (retVal) {
 +                        val.set(newVal);
 +
 +                        atomicView.put(key, val);
 +
 +                        tx.commit();
 +                    }
 +
 +                    return retVal;
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to compare and set: " + this, e);
 +
 +                    throw e;
 +                }
 +            }
 +        };
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void writeExternal(ObjectOutput out) throws IOException {
 +        out.writeObject(ctx.kernalContext());
 +        out.writeUTF(name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +        IgniteBiTuple<GridKernalContext, String> t = stash.get();
 +
 +        t.set1((GridKernalContext)in.readObject());
 +        t.set2(in.readUTF());
 +    }
 +
 +    /**
 +     * Reconstructs object on unmarshalling.
 +     *
 +     * @return Reconstructed object.
 +     * @throws ObjectStreamException Thrown in case of unmarshalling error.
 +     */
 +    private Object readResolve() throws ObjectStreamException {
 +        try {
 +            IgniteBiTuple<GridKernalContext, String> t = stash.get();
 +
 +            return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
 +        }
 +        finally {
 +            stash.remove();
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(GridCacheAtomicLongImpl.class, this);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index c945a06,0000000..efdaac2
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@@ -1,325 -1,0 +1,325 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastructures;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.datastructures.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
- import org.apache.ignite.lang.*;
- import org.apache.ignite.transactions.*;
++import org.apache.ignite.internal.processors.cache.transactions.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
++import org.apache.ignite.lang.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.io.*;
 +import java.util.concurrent.*;
 +
 +import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
 +import static org.apache.ignite.transactions.IgniteTxIsolation.*;
 +
 +/**
 + * Cache atomic reference implementation.
 + */
 +public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicReferenceEx<T>, Externalizable {
 +    /** */
 +    private static final long serialVersionUID = 0L;
 +
 +    /** Deserialization stash. */
 +    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
 +        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
 +            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
 +                return F.t2();
 +            }
 +        };
 +
 +    /** Logger. */
 +    private IgniteLogger log;
 +
 +    /** Atomic reference name. */
 +    private String name;
 +
 +    /** Status.*/
 +    private volatile boolean rmvd;
 +
 +    /** Atomic reference key. */
 +    private GridCacheInternalKey key;
 +
 +    /** Atomic reference projection. */
 +    private CacheProjection<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> atomicView;
 +
 +    /** Cache context. */
 +    private GridCacheContext ctx;
 +
 +    /** Callable for {@link #get} operation */
 +    private final Callable<T> getCall = new Callable<T>() {
 +        @Override public T call() throws Exception {
 +            GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
 +
 +            if (ref == null)
 +                throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
 +
 +            return ref.get();
 +        }
 +    };
 +
 +    /**
 +     * Empty constructor required by {@link Externalizable}.
 +     */
 +    public GridCacheAtomicReferenceImpl() {
 +        // No-op.
 +    }
 +
 +    /**
 +     * Default constructor.
 +     *
 +     * @param name Atomic reference name.
 +     * @param key Atomic reference key.
 +     * @param atomicView Atomic projection.
 +     * @param ctx Cache context.
 +     */
 +    public GridCacheAtomicReferenceImpl(String name,
 +        GridCacheInternalKey key,
 +        CacheProjection<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> atomicView,
 +        GridCacheContext ctx) {
 +        assert key != null;
 +        assert atomicView != null;
 +        assert ctx != null;
 +        assert name != null;
 +
 +        this.ctx = ctx;
 +        this.key = key;
 +        this.atomicView = atomicView;
 +        this.name = name;
 +
 +        log = ctx.gridConfig().getGridLogger().getLogger(getClass());
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String name() {
 +        return name;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public T get() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(getCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void set(T val) throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        CU.outTx(internalSet(val), ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean compareAndSet(T expVal, T newVal) throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(internalCompareAndSet(wrapperPredicate(expVal), wrapperClosure(newVal)), ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean onRemoved() {
 +        return rmvd = true;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onInvalid(@Nullable Exception err) {
 +        // No-op.
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridCacheInternalKey key() {
 +        return key;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean removed() {
 +        return rmvd;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void close() {
 +        if (rmvd)
 +            return;
 +
 +        try {
 +            ctx.kernalContext().dataStructures().removeAtomicReference(name);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IgniteException(e);
 +        }
 +    }
 +
 +    /**
 +     * Method make wrapper predicate for existing value.
 +     *
 +     * @param val Value.
 +     * @return Predicate.
 +     */
 +    private IgnitePredicate<T> wrapperPredicate(final T val) {
 +        return new IgnitePredicate<T>() {
 +            @Override public boolean apply(T e) {
 +                return val != null && val.equals(e);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Method make wrapper closure for existing value.
 +     *
 +     * @param val Value.
 +     * @return Closure.
 +     */
 +    private IgniteClosure<T, T> wrapperClosure(final T val) {
 +        return new IgniteClosure<T, T>() {
 +            @Override public T apply(T e) {
 +                return val;
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Method returns callable for execution {@link #set(Object)} operation in async and sync mode.
 +     *
 +     * @param val Value will be set in reference .
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    private Callable<Boolean> internalSet(final T val) {
 +        return new Callable<Boolean>() {
 +            @Override public Boolean call() throws Exception {
-                 try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                    GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
 +
 +                    if (ref == null)
 +                        throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
 +
 +                    ref.set(val);
 +
 +                    atomicView.put(key, ref);
 +
 +                    tx.commit();
 +
 +                    return true;
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e);
 +
 +                    throw e;
 +                }
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Conditionally sets the new value. It will be set if {@code expValPred} is
 +     * evaluate to {@code true}.
 +     *
 +     * @param expValPred Predicate which should evaluate to {@code true} for value to be set.
 +     * @param newValClos Closure which generates new value.
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
 +        final IgniteClosure<T, T> newValClos) {
 +        return new Callable<Boolean>() {
 +            @Override public Boolean call() throws Exception {
-                 try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                    GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
 +
 +                    if (ref == null)
 +                        throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
 +
 +                    if (!expValPred.apply(ref.get())) {
 +                        tx.setRollbackOnly();
 +
 +                        return false;
 +                    }
 +                    else {
 +                        ref.set(newValClos.apply(ref.get()));
 +
 +                        atomicView.put(key, ref);
 +
 +                        tx.commit();
 +
 +                        return true;
 +                    }
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to compare and value [expValPred=" + expValPred + ", newValClos" +
 +                        newValClos + ", atomicReference" + this + ']', e);
 +
 +                    throw e;
 +                }
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Check removed status.
 +     *
 +     * @throws IgniteCheckedException If removed.
 +     */
 +    private void checkRemoved() throws IgniteCheckedException {
 +        if (rmvd)
 +            throw new DataStructureRemovedException("Atomic reference was removed from cache: " + name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void writeExternal(ObjectOutput out) throws IOException {
 +        out.writeObject(ctx.kernalContext());
 +        out.writeUTF(name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +        IgniteBiTuple<GridKernalContext, String> t = stash.get();
 +
 +        t.set1((GridKernalContext)in.readObject());
 +        t.set2(in.readUTF());
 +    }
 +
 +    /**
 +     * Reconstructs object on unmarshalling.
 +     *
 +     * @return Reconstructed object.
 +     * @throws ObjectStreamException Thrown in case of unmarshalling error.
 +     */
 +    @SuppressWarnings("unchecked")
 +    private Object readResolve() throws ObjectStreamException {
 +        try {
 +            IgniteBiTuple<GridKernalContext, String> t = stash.get();
 +
 +            return t.get1().dataStructures().atomicReference(t.get2(), null, false);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
 +        }
 +        finally {
 +            stash.remove();
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(GridCacheAtomicReferenceImpl.class, this);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 2468289,0000000..c66a890
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@@ -1,536 -1,0 +1,536 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastructures;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.datastructures.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
- import org.apache.ignite.lang.*;
- import org.apache.ignite.transactions.*;
++import org.apache.ignite.internal.processors.cache.transactions.*;
++import org.apache.ignite.internal.util.future.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
- import org.apache.ignite.internal.util.future.*;
++import org.apache.ignite.lang.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.io.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.*;
 +import java.util.concurrent.locks.*;
 +
 +import static java.util.concurrent.TimeUnit.*;
 +import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
 +import static org.apache.ignite.transactions.IgniteTxIsolation.*;
 +
 +/**
 + * Cache sequence implementation.
 + */
 +public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenceEx, Externalizable {
 +    /** */
 +    private static final long serialVersionUID = 0L;
 +
 +    /** De-serialization stash. */
 +    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
 +        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
 +            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
 +                return F.t2();
 +            }
 +        };
 +
 +    /** Logger. */
 +    private IgniteLogger log;
 +
 +    /** Sequence name. */
 +    private String name;
 +
 +    /** Removed flag. */
 +    private volatile boolean rmvd;
 +
 +    /** Sequence key. */
 +    private GridCacheInternalKey key;
 +
 +    /** Sequence projection. */
 +    private CacheProjection<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView;
 +
 +    /** Cache context. */
 +    private volatile GridCacheContext ctx;
 +
 +    /** Local value of sequence. */
 +    private long locVal;
 +
 +    /**  Upper bound of local counter. */
 +    private long upBound;
 +
 +    /**  Sequence batch size */
 +    private volatile int batchSize;
 +
 +    /** Synchronization lock. */
 +    private final Lock lock = new ReentrantLock();
 +
 +    /** Await condition. */
 +    private Condition cond = lock.newCondition();
 +
 +    /** Callable for execution {@link #incrementAndGet} operation in async and sync mode.  */
 +    private final Callable<Long> incAndGetCall = internalUpdate(1, true);
 +
 +    /** Callable for execution {@link #getAndIncrement} operation in async and sync mode.  */
 +    private final Callable<Long> getAndIncCall = internalUpdate(1, false);
 +
 +    /** Add and get cache call guard. */
 +    private final AtomicBoolean updateGuard = new AtomicBoolean();
 +
 +    /**
 +     * Empty constructor required by {@link Externalizable}.
 +     */
 +    public GridCacheAtomicSequenceImpl() {
 +        // No-op.
 +    }
 +
 +    /**
 +     * Default constructor.
 +     *
 +     * @param name Sequence name.
 +     * @param key Sequence key.
 +     * @param seqView Sequence projection.
 +     * @param ctx CacheContext.
 +     * @param batchSize Sequence batch size.
 +     * @param locVal Local counter.
 +     * @param upBound Upper bound.
 +     */
 +    public GridCacheAtomicSequenceImpl(String name,
 +        GridCacheInternalKey key,
 +        CacheProjection<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView,
 +        GridCacheContext ctx,
 +        int batchSize,
 +        long locVal,
 +        long upBound)
 +    {
 +        assert key != null;
 +        assert seqView != null;
 +        assert ctx != null;
 +        assert locVal <= upBound;
 +
 +        this.batchSize = batchSize;
 +        this.ctx = ctx;
 +        this.key = key;
 +        this.seqView = seqView;
 +        this.upBound = upBound;
 +        this.locVal = locVal;
 +        this.name = name;
 +
 +        log = ctx.gridConfig().getGridLogger().getLogger(getClass());
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String name() {
 +        return name;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long get() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        lock.lock();
 +
 +        try {
 +            return locVal;
 +        }
 +        finally {
 +            lock.unlock();
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long incrementAndGet() throws IgniteCheckedException {
 +        return internalUpdate(1, incAndGetCall, true);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long getAndIncrement() throws IgniteCheckedException {
 +        return internalUpdate(1, getAndIncCall, false);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long addAndGet(long l) throws IgniteCheckedException {
 +        A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 +
 +        return internalUpdate(l, null, true);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public long getAndAdd(long l) throws IgniteCheckedException {
 +        A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 +
 +        return internalUpdate(l, null, false);
 +    }
 +
 +    /**
 +     * Synchronous sequence update operation. Will add given amount to the sequence value.
 +     *
 +     * @param l Increment amount.
 +     * @param updateCall Cache call that will update sequence reservation count in accordance with l.
 +     * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
 +     *      prior to update.
 +     * @return Sequence value.
 +     * @throws IgniteCheckedException If update failed.
 +     */
 +    @SuppressWarnings("SignalWithoutCorrespondingAwait")
 +    private long internalUpdate(long l, @Nullable Callable<Long> updateCall, boolean updated) throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        assert l > 0;
 +
 +        lock.lock();
 +
 +        try {
 +            // If reserved range isn't exhausted.
 +            if (locVal + l <= upBound) {
 +                long curVal = locVal;
 +
 +                locVal += l;
 +
 +                return updated ? locVal : curVal;
 +            }
 +        }
 +        finally {
 +            lock.unlock();
 +        }
 +
 +        if (updateCall == null)
 +            updateCall = internalUpdate(l, updated);
 +
 +        while (true) {
 +            if (updateGuard.compareAndSet(false, true)) {
 +                try {
 +                    // This call must be outside lock.
 +                    return CU.outTx(updateCall, ctx);
 +                }
 +                finally {
 +                    lock.lock();
 +
 +                    try {
 +                        updateGuard.set(false);
 +
 +                        cond.signalAll();
 +                    }
 +                    finally {
 +                        lock.unlock();
 +                    }
 +                }
 +            }
 +            else {
 +                lock.lock();
 +
 +                try {
 +                    while (locVal >= upBound && updateGuard.get())
 +                        U.await(cond, 500, MILLISECONDS);
 +
 +                    checkRemoved();
 +
 +                    // If reserved range isn't exhausted.
 +                    if (locVal + l <= upBound) {
 +                        long curVal = locVal;
 +
 +                        locVal += l;
 +
 +                        return updated ? locVal : curVal;
 +                    }
 +                }
 +                finally {
 +                    lock.unlock();
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Asynchronous sequence update operation. Will add given amount to the sequence value.
 +     *
 +     * @param l Increment amount.
 +     * @param updateCall Cache call that will update sequence reservation count in accordance with l.
 +     * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
 +     *      prior to update.
 +     * @return Future indicating sequence value.
 +     * @throws IgniteCheckedException If update failed.
 +     */
 +    @SuppressWarnings("SignalWithoutCorrespondingAwait")
-     private IgniteFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated)
++    private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated)
 +        throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
 +
 +        lock.lock();
 +
 +        try {
 +            // If reserved range isn't exhausted.
 +            if (locVal + l <= upBound) {
 +                long curVal = locVal;
 +
 +                locVal += l;
 +
 +                return new GridFinishedFuture<>(ctx.kernalContext(), updated ? locVal : curVal);
 +            }
 +        }
 +        finally {
 +            lock.unlock();
 +        }
 +
 +        if (updateCall == null)
 +            updateCall = internalUpdate(l, updated);
 +
 +        while (true) {
 +            if (updateGuard.compareAndSet(false, true)) {
 +                try {
 +                    // This call must be outside lock.
 +                    return ctx.closures().callLocalSafe(updateCall, true);
 +                }
 +                finally {
 +                    lock.lock();
 +
 +                    try {
 +                        updateGuard.set(false);
 +
 +                        cond.signalAll();
 +                    }
 +                    finally {
 +                        lock.unlock();
 +                    }
 +                }
 +            }
 +            else {
 +                lock.lock();
 +
 +                try {
 +                    while (locVal >= upBound && updateGuard.get())
 +                        U.await(cond, 500, MILLISECONDS);
 +
 +                    checkRemoved();
 +
 +                    // If reserved range isn't exhausted.
 +                    if (locVal + l <= upBound) {
 +                        long curVal = locVal;
 +
 +                        locVal += l;
 +
 +                        return new GridFinishedFuture<>(ctx.kernalContext(), updated ? locVal : curVal);
 +                    }
 +                }
 +                finally {
 +                    lock.unlock();
 +                }
 +            }
 +        }
 +    }
 +
 +    /** Get local batch size for this sequences.
 +     *
 +     * @return Sequence batch size.
 +     */
 +    @Override public int batchSize() {
 +        return batchSize;
 +    }
 +
 +    /**
 +     * Set local batch size for this sequences.
 +     *
 +     * @param size Sequence batch size. Must be more then 0.
 +     */
 +    @Override public void batchSize(int size) {
 +        A.ensure(size > 0, " Batch size can't be less then 0: " + size);
 +
 +        lock.lock();
 +
 +        try {
 +            batchSize = size;
 +        }
 +        finally {
 +            lock.unlock();
 +        }
 +    }
 +
 +    /**
 +     * Check removed status.
 +     *
 +     * @throws IgniteCheckedException If removed.
 +     */
 +    private void checkRemoved() throws IgniteCheckedException {
 +        if (rmvd)
 +            throw new DataStructureRemovedException("Sequence was removed from cache: " + name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean onRemoved() {
 +        return rmvd = true;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onInvalid(@Nullable Exception err) {
 +        // No-op.
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridCacheInternalKey key() {
 +        return key;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean removed() {
 +        return rmvd;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void close() {
 +        try {
 +            if (rmvd)
 +                return;
 +
 +            ctx.kernalContext().dataStructures().removeSequence(name);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IgniteException(e);
 +        }
 +    }
 +
 +    /**
 +     * Method returns callable for execution all update operations in async and sync mode.
 +     *
 +     * @param l Value will be added to sequence.
 +     * @param updated If {@code true}, will return updated value, if {@code false}, will return previous value.
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    @SuppressWarnings("TooBroadScope")
 +    private Callable<Long> internalUpdate(final long l, final boolean updated) {
 +        return new Callable<Long>() {
 +            @Override public Long call() throws Exception {
-                 IgniteTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ);
++                IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ);
 +
 +                try {
 +                    GridCacheAtomicSequenceValue seq = seqView.get(key);
 +
 +                    checkRemoved();
 +
 +                    assert seq != null;
 +
 +                    long curLocVal;
 +
 +                    long newUpBound;
 +
 +                    lock.lock();
 +
 +                    try {
 +                        curLocVal = locVal;
 +
 +                        // If local range was already reserved in another thread.
 +                        if (locVal + l <= upBound) {
 +                            long retVal = locVal;
 +
 +                            locVal += l;
 +
 +                            return updated ? locVal : retVal;
 +                        }
 +
 +                        long curGlobalVal = seq.get();
 +
 +                        long newLocVal;
 +
 +                        /* We should use offset because we already reserved left side of range.*/
 +                        long off = batchSize > 1 ? batchSize - 1 : 1;
 +
 +                        // Calculate new values for local counter, global counter and upper bound.
 +                        if (curLocVal + l >= curGlobalVal) {
 +                            newLocVal = curLocVal + l;
 +
 +                            newUpBound = newLocVal + off;
 +                        }
 +                        else {
 +                            newLocVal = curGlobalVal;
 +
 +                            newUpBound = newLocVal + off;
 +                        }
 +
 +                        locVal = newLocVal;
 +                        upBound = newUpBound;
 +
 +                        if (updated)
 +                            curLocVal = newLocVal;
 +                    }
 +                    finally {
 +                        lock.unlock();
 +                    }
 +
 +                    // Global counter must be more than reserved upper bound.
 +                    seq.set(newUpBound + 1);
 +
 +                    seqView.put(key, seq);
 +
 +                    tx.commit();
 +
 +                    return curLocVal;
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to get and add: " + this, e);
 +
 +                    throw e;
 +                } finally {
 +                    tx.close();
 +                }
 +            }
 +        };
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void writeExternal(ObjectOutput out) throws IOException {
 +        out.writeObject(ctx.kernalContext());
 +        out.writeUTF(name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +        IgniteBiTuple<GridKernalContext, String> t = stash.get();
 +
 +        t.set1((GridKernalContext)in.readObject());
 +        t.set2(in.readUTF());
 +    }
 +
 +    /**
 +     * Reconstructs object on unmarshalling.
 +     *
 +     * @return Reconstructed object.
 +     * @throws ObjectStreamException Thrown in case of unmarshalling error.
 +     */
 +    private Object readResolve() throws ObjectStreamException {
 +        try {
 +            IgniteBiTuple<GridKernalContext, String> t = stash.get();
 +
 +            return t.get1().dataStructures().sequence(t.get2(), 0L, false);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
 +        }
 +        finally {
 +            stash.remove();
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(GridCacheAtomicSequenceImpl.class, this);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c6ca063f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 9605381,0000000..db73ebb
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@@ -1,355 -1,0 +1,355 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.datastructures;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.datastructures.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
++import org.apache.ignite.internal.processors.cache.transactions.*;
 +import org.apache.ignite.internal.util.*;
- import org.apache.ignite.lang.*;
- import org.apache.ignite.transactions.*;
 +import org.apache.ignite.internal.util.tostring.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
++import org.apache.ignite.lang.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.io.*;
 +import java.util.concurrent.*;
 +
 +import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
 +import static org.apache.ignite.transactions.IgniteTxIsolation.*;
 +
 +/**
 + * Cache atomic stamped implementation.
 + */
 +public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicStampedEx<T, S>, Externalizable {
 +    /** */
 +    private static final long serialVersionUID = 0L;
 +
 +    /** Deserialization stash. */
 +    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
 +        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
 +            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
 +                return F.t2();
 +            }
 +        };
 +
 +    /** Logger. */
 +    private IgniteLogger log;
 +
 +    /** Atomic stamped name. */
 +    private String name;
 +
 +    /** Removed flag.*/
 +    private volatile boolean rmvd;
 +
 +    /** Atomic stamped key. */
 +    private GridCacheInternalKey key;
 +
 +    /** Atomic stamped projection. */
 +    private CacheProjection<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> atomicView;
 +
 +    /** Cache context. */
 +    private GridCacheContext ctx;
 +
 +    /** Callable for {@link #get()} operation */
 +    private final Callable<IgniteBiTuple<T, S>> getCall = new Callable<IgniteBiTuple<T, S>>() {
 +        @Override public IgniteBiTuple<T, S> call() throws Exception {
 +            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
 +
 +            if (stmp == null)
 +                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
 +
 +            return stmp.get();
 +        }
 +    };
 +
 +    /** Callable for {@link #value()} operation */
 +    private final Callable<T> valCall = new Callable<T>() {
 +        @Override public T call() throws Exception {
 +            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
 +
 +            if (stmp == null)
 +                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
 +
 +            return stmp.value();
 +        }
 +    };
 +
 +    /** Callable for {@link #stamp()} operation */
 +    private final Callable<S> stampCall = new Callable<S>() {
 +        @Override public S call() throws Exception {
 +            GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
 +
 +            if (stmp == null)
 +                throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
 +
 +            return stmp.stamp();
 +        }
 +    };
 +
 +    /**
 +     * Empty constructor required by {@link Externalizable}.
 +     */
 +    public GridCacheAtomicStampedImpl() {
 +        // No-op.
 +    }
 +
 +    /**
 +     * Default constructor.
 +     *
 +     * @param name Atomic stamped name.
 +     * @param key Atomic stamped key.
 +     * @param atomicView Atomic projection.
 +     * @param ctx Cache context.
 +     */
 +    public GridCacheAtomicStampedImpl(String name, GridCacheInternalKey key, CacheProjection<GridCacheInternalKey,
 +            GridCacheAtomicStampedValue<T, S>> atomicView, GridCacheContext ctx) {
 +        assert key != null;
 +        assert atomicView != null;
 +        assert ctx != null;
 +        assert name != null;
 +
 +        this.ctx = ctx;
 +        this.key = key;
 +        this.atomicView = atomicView;
 +        this.name = name;
 +
 +        log = ctx.gridConfig().getGridLogger().getLogger(getClass());
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String name() {
 +        return name;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public IgniteBiTuple<T, S> get() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(getCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void set(T val, S stamp) throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        CU.outTx(internalSet(val, stamp), ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean compareAndSet(T expVal, T newVal, S expStamp, S newStamp) throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(internalCompareAndSet(F0.equalTo(expVal), wrapperClosure(newVal),
 +            F0.equalTo(expStamp), wrapperClosure(newStamp)), ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public S stamp() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(stampCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public T value() throws IgniteCheckedException {
 +        checkRemoved();
 +
 +        return CU.outTx(valCall, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean onRemoved() {
 +        return rmvd = true;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void onInvalid(@Nullable Exception err) {
 +        // No-op.
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridCacheInternalKey key() {
 +        return key;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean removed() {
 +        return rmvd;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void close() {
 +        if (rmvd)
 +            return;
 +
 +        try {
 +            ctx.kernalContext().dataStructures().removeAtomicStamped(name);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IgniteException(e);
 +        }
 +    }
 +
 +    /**
 +     * Method make wrapper closure for existing value.
 +     *
 +     * @param val Value.
 +     * @return Closure.
 +     */
 +    private <N> IgniteClosure<N, N> wrapperClosure(final N val) {
 +        return new IgniteClosure<N, N>() {
 +            @Override public N apply(N e) {
 +                return val;
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Method returns callable for execution {@link #set(Object,Object)}} operation in async and sync mode.
 +     *
 +     * @param val Value will be set in the atomic stamped.
 +     * @param stamp Stamp will be set in the atomic stamped.
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    private Callable<Boolean> internalSet(final T val, final S stamp) {
 +        return new Callable<Boolean>() {
 +            @Override public Boolean call() throws Exception {
-                 try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                    GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
 +
 +                    if (stmp == null)
 +                        throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
 +
 +                    stmp.set(val, stamp);
 +
 +                    atomicView.put(key, stmp);
 +
 +                    tx.commit();
 +
 +                    return true;
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to set [val=" + val + ", stamp=" + stamp + ", atomicStamped=" + this + ']', e);
 +
 +                    throw e;
 +                }
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Conditionally asynchronously sets the new value and new stamp. They will be set if
 +     * {@code expValPred} and {@code expStampPred} both evaluate to {@code true}.
 +     *
 +     * @param expValPred Predicate which should evaluate to {@code true} for value to be set
 +     * @param newValClos Closure generates new value.
 +     * @param expStampPred Predicate which should evaluate to {@code true} for value to be set
 +     * @param newStampClos Closure generates new stamp value.
 +     * @return Callable for execution in async and sync mode.
 +     */
 +    private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
 +        final IgniteClosure<T, T> newValClos, final IgnitePredicate<S> expStampPred,
 +        final IgniteClosure<S, S> newStampClos) {
 +        return new Callable<Boolean>() {
 +            @Override public Boolean call() throws Exception {
-                 try (IgniteTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
++                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
 +                    GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
 +
 +                    if (stmp == null)
 +                        throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
 +
 +                    if (!(expValPred.apply(stmp.value()) && expStampPred.apply(stmp.stamp()))) {
 +                        tx.setRollbackOnly();
 +
 +                        return false;
 +                    }
 +                    else {
 +                        stmp.set(newValClos.apply(stmp.value()), newStampClos.apply(stmp.stamp()));
 +
 +                        atomicView.put(key, stmp);
 +
 +                        tx.commit();
 +
 +                        return true;
 +                    }
 +                }
 +                catch (Error | Exception e) {
 +                    U.error(log, "Failed to compare and set [expValPred=" + expValPred + ", newValClos=" +
 +                        newValClos + ", expStampPred=" + expStampPred + ", newStampClos=" + newStampClos +
 +                        ", atomicStamped=" + this + ']', e);
 +
 +                    throw e;
 +                }
 +            }
 +        };
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void writeExternal(ObjectOutput out) throws IOException {
 +        out.writeObject(ctx.kernalContext());
 +        out.writeUTF(name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +        IgniteBiTuple<GridKernalContext, String> t = stash.get();
 +
 +        t.set1((GridKernalContext)in.readObject());
 +        t.set2(in.readUTF());
 +    }
 +
 +    /**
 +     * Reconstructs object on unmarshalling.
 +     *
 +     * @return Reconstructed object.
 +     * @throws ObjectStreamException Thrown in case of unmarshalling error.
 +     */
 +    @SuppressWarnings("unchecked")
 +    private Object readResolve() throws ObjectStreamException {
 +        try {
 +            IgniteBiTuple<GridKernalContext, String> t = stash.get();
 +
 +            return t.get1().dataStructures().atomicStamped(t.get2(), null, null, false);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
 +        }
 +        finally {
 +            stash.remove();
 +        }
 +    }
 +
 +    /**
 +     * Check removed status.
 +     *
 +     * @throws IgniteCheckedException If removed.
 +     */
 +    private void checkRemoved() throws IgniteCheckedException {
 +        if (rmvd)
 +            throw new DataStructureRemovedException("Atomic stamped was removed from cache: " + name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return GridToStringBuilder.toString(GridCacheAtomicStampedImpl.class, this);
 +    }
 +}


Mime
View raw message