ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [09/16] incubator-ignite git commit: # ignite-60
Date Mon, 26 Jan 2015 14:18:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetItemKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetItemKey.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetItemKey.java
deleted file mode 100644
index 60aa5db..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetItemKey.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Set item key.
- */
-public class GridCacheSetItemKey implements GridCacheInternal, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private IgniteUuid setId;
-
-    /** */
-    @GridToStringInclude
-    private Object item;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheSetItemKey() {
-        // No-op.
-    }
-
-    /**
-     * @param setId Set unique ID.
-     * @param item Set item.
-     */
-    GridCacheSetItemKey(IgniteUuid setId, Object item) {
-        this.setId = setId;
-        this.item = item;
-    }
-
-    /**
-     * @return Set UUID.
-     */
-    public IgniteUuid setId() {
-        return setId;
-    }
-
-    /**
-     * @return Set item.
-     */
-    public Object item() {
-        return item;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int result = setId.hashCode();
-
-        result = 31 * result + item.hashCode();
-
-        return result;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridCacheSetItemKey that = (GridCacheSetItemKey)o;
-
-        return setId.equals(that.setId) && item.equals(that.item);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeGridUuid(out, setId);
-        out.writeObject(item);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-        setId = U.readGridUuid(in);
-        item = in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheSetItemKey.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetProxy.java
deleted file mode 100644
index 01d1776..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetProxy.java
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.datastructures.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Cache set proxy.
- */
-public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Deserialization stash. */
-    private static final ThreadLocal<IgniteBiTuple<GridCacheContext, String>>
stash =
-        new ThreadLocal<IgniteBiTuple<GridCacheContext, String>>() {
-            @Override protected IgniteBiTuple<GridCacheContext, String> initialValue()
{
-                return F.t2();
-            }
-        };
-
-    /** Delegate set. */
-    private GridCacheSetImpl<T> delegate;
-
-    /** Cache context. */
-    private GridCacheContext cctx;
-
-    /** Cache gateway. */
-    private GridCacheGateway gate;
-
-    /** Busy lock. */
-    private GridSpinBusyLock busyLock;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridCacheSetProxy() {
-        // No-op.
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param delegate Delegate set.
-     */
-    public GridCacheSetProxy(GridCacheContext cctx, GridCacheSetImpl<T> delegate) {
-        this.cctx = cctx;
-        this.delegate = delegate;
-
-        gate = cctx.gate();
-
-        busyLock = new GridSpinBusyLock();
-    }
-
-    /**
-     * Remove callback.
-     */
-    void blockOnRemove() {
-        delegate.removed(true);
-
-        busyLock.block();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Integer>() {
-                        @Override public Integer call() throws Exception {
-                            return delegate.size();
-                        }
-                    }, cctx);
-
-                return delegate.size();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isEmpty() {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Boolean>() {
-                        @Override public Boolean call() throws Exception {
-                            return delegate.isEmpty();
-                        }
-                    }, cctx);
-
-                return delegate.isEmpty();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean contains(final Object o) {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Boolean>() {
-                        @Override public Boolean call() throws Exception {
-                            return delegate.contains(o);
-                        }
-                    }, cctx);
-
-                return delegate.contains(o);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @NotNull @Override public Object[] toArray() {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Object[]>() {
-                        @Override public Object[] call() throws Exception {
-                            return delegate.toArray();
-                        }
-                    }, cctx);
-
-                return delegate.toArray();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @NotNull @Override public <T1> T1[] toArray(final T1[] a) {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<T1[]>() {
-                        @Override public T1[] call() throws Exception {
-                            return delegate.toArray(a);
-                        }
-                    }, cctx);
-
-                return delegate.toArray(a);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean add(final T t) {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Boolean>() {
-                        @Override public Boolean call() throws Exception {
-                            return delegate.add(t);
-                        }
-                    }, cctx);
-
-                return delegate.add(t);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean remove(final Object o) {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Boolean>() {
-                        @Override public Boolean call() throws Exception {
-                            return delegate.remove(o);
-                        }
-                    }, cctx);
-
-                return delegate.remove(o);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean containsAll(final Collection<?> c) {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Boolean>() {
-                        @Override public Boolean call() throws Exception {
-                            return delegate.containsAll(c);
-                        }
-                    }, cctx);
-
-                return delegate.containsAll(c);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean addAll(final Collection<? extends T> c) {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Boolean>() {
-                        @Override public Boolean call() throws Exception {
-                            return delegate.addAll(c);
-                        }
-                    }, cctx);
-
-                return delegate.addAll(c);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean retainAll(final Collection<?> c) {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Boolean>() {
-                        @Override public Boolean call() throws Exception {
-                            return delegate.retainAll(c);
-                        }
-                    }, cctx);
-
-                return delegate.retainAll(c);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removeAll(final Collection<?> c) {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Boolean>() {
-                        @Override public Boolean call() throws Exception {
-                            return delegate.removeAll(c);
-                        }
-                    }, cctx);
-
-                return delegate.removeAll(c);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clear() {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional()) {
-                    CU.outTx(new Callable<Void>() {
-                        @Override public Void call() throws Exception {
-                            delegate.clear();
-
-                            return null;
-                        }
-                    }, cctx);
-                }
-                else
-                    delegate.clear();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<T> iterator() {
-        enterBusy();
-
-        try {
-            gate.enter();
-
-            try {
-                if (cctx.transactional())
-                    return CU.outTx(new Callable<Iterator<T>>() {
-                        @Override public Iterator<T> call() throws Exception {
-                            return delegate.iterator();
-                        }
-                    }, cctx);
-
-                return delegate.iterator();
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-            finally {
-                gate.leave();
-            }
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return delegate.name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean collocated() throws IgniteCheckedException {
-        return delegate.collocated();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removed() {
-        return delegate.removed();
-    }
-
-    /**
-     * Enters busy state.
-     */
-    private void enterBusy() {
-        if (!busyLock.enterBusy())
-            throw new CacheDataStructureRemovedRuntimeException("Set has been removed from
cache: " + delegate);
-    }
-
-    /**
-     * Leaves busy state.
-     */
-    private void leaveBusy() {
-        busyLock.leaveBusy();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(cctx);
-        U.writeString(out, name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-        IgniteBiTuple<GridCacheContext, String> t = stash.get();
-
-        t.set1((GridCacheContext)in.readObject());
-        t.set2(U.readString(in));
-    }
-
-    /**
-     * Reconstructs object on unmarshalling.
-     *
-     * @return Reconstructed object.
-     * @throws ObjectStreamException Thrown in case of unmarshalling error.
-     */
-    protected Object readResolve() throws ObjectStreamException {
-        try {
-            IgniteBiTuple<GridCacheContext, String> t = stash.get();
-
-            return t.get1().dataStructures().set(t.get2(), false, false);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
-        }
-        finally {
-            stash.remove();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return delegate.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridSetQueryPredicate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridSetQueryPredicate.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridSetQueryPredicate.java
deleted file mode 100644
index 7307e9d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridSetQueryPredicate.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Predicate for query over {@link org.apache.ignite.cache.datastructures.IgniteSet} items.
- */
-public class GridSetQueryPredicate<K, V> implements IgniteBiPredicate<K, V>,
Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private IgniteUuid setId;
-
-    /** */
-    private boolean collocated;
-
-    /** */
-    private GridCacheContext ctx;
-
-    /** */
-    private boolean filter;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public GridSetQueryPredicate() {
-        // No-op.
-    }
-
-    /**
-     * @param setId Set ID.
-     * @param collocated Collocation flag.
-     */
-    public GridSetQueryPredicate(IgniteUuid setId, boolean collocated) {
-        this.setId = setId;
-        this.collocated = collocated;
-    }
-
-    /**
-     * @param ctx Cache context.
-     */
-    public void init(GridCacheContext ctx) {
-        this.ctx = ctx;
-
-        filter = filterKeys();
-    }
-
-    /**
-     *
-     * @return Collocation flag.
-     */
-    public boolean collocated() {
-        return collocated;
-    }
-
-    /**
-     * @return Set ID.
-     */
-    public IgniteUuid setId() {
-        return setId;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public boolean apply(K k, V v) {
-        return !filter || ctx.affinity().primary(ctx.localNode(), k, ctx.affinity().affinityTopologyVersion());
-    }
-
-    /**
-     * @return {@code True} if need to filter out non-primary keys during processing of set
data query.
-     */
-    private boolean filterKeys() {
-        return !collocated && !(ctx.isLocal() || ctx.isReplicated()) &&
-            (ctx.config().getBackups() > 0 || CU.isNearEnabled(ctx));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeGridUuid(out, setId);
-        out.writeBoolean(collocated);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-        setId = U.readGridUuid(in);
-        collocated = in.readBoolean();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridSetQueryPredicate.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java
deleted file mode 100644
index d10aa5a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridTransactionalCacheQueueImpl.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.datastructures;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.transactions.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import java.util.*;
-
-import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
-import static org.apache.ignite.transactions.IgniteTxIsolation.*;
-
-/**
- * {@link org.apache.ignite.cache.datastructures.IgniteQueue} implementation using transactional
cache.
- */
-public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
{
-    /** */
-    private final IgniteTransactions txs;
-
-    /**
-     * @param queueName Queue name.
-     * @param hdr Queue header.
-     * @param cctx Cache context.
-     */
-    public GridTransactionalCacheQueueImpl(String queueName, GridCacheQueueHeader hdr, GridCacheContext<?,
?> cctx) {
-        super(queueName, hdr, cctx);
-
-        txs = cctx.kernalContext().grid().transactions();
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public boolean offer(final T item) throws IgniteException {
-        A.notNull(item, "item");
-
-        try {
-            boolean retVal;
-
-            int cnt = 0;
-
-            while (true) {
-                try {
-                    try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                        Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1));
-
-                        if (idx != null) {
-                            checkRemoved(idx);
-
-                            cache.put(itemKey(idx), item);
-
-                            retVal = true;
-                        }
-                        else
-                            retVal = false;
-
-                        tx.commit();
-
-                        break;
-                    }
-                }
-                catch (CacheException e) {
-                    if (e.getCause() instanceof ClusterGroupEmptyException)
-                        throw e;
-
-                    if (e.getCause() instanceof ClusterTopologyException) {
-                        if (cnt++ == MAX_UPDATE_RETRIES)
-                            throw e;
-                        else {
-                            U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
-                            U.sleep(RETRY_DELAY);
-                        }
-                    }
-                    else
-                        throw e;
-                }
-            }
-
-            return retVal;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Nullable @Override public T poll() throws IgniteException {
-        try {
-            int cnt = 0;
-
-            T retVal;
-
-            while (true) {
-                try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id));
-
-                    if (idx != null) {
-                        checkRemoved(idx);
-
-                        retVal = (T)cache.getAndRemove(itemKey(idx));
-
-                        assert retVal != null;
-                    }
-                    else
-                        retVal = null;
-
-                    tx.commit();
-
-                    break;
-                }
-                catch (CacheException e) {
-                    if (e.getCause() instanceof ClusterGroupEmptyException)
-                        throw e;
-
-                    if (e.getCause() instanceof ClusterTopologyException) {
-                        if (cnt++ == MAX_UPDATE_RETRIES)
-                            throw e;
-                        else {
-                            U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
-                            U.sleep(RETRY_DELAY);
-                        }
-                    }
-                    else
-                        throw e;
-                }
-            }
-
-            return retVal;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public boolean addAll(final Collection<? extends T> items) {
-        A.notNull(items, "items");
-
-        try {
-            boolean retVal;
-
-            int cnt = 0;
-
-            while (true) {
-                try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size()));
-
-                    if (idx != null) {
-                        checkRemoved(idx);
-
-                        Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
-
-                        for (T item : items) {
-                            putMap.put(itemKey(idx), item);
-
-                            idx++;
-                        }
-
-                        cache.putAll(putMap);
-
-                        retVal = true;
-                    }
-                    else
-                        retVal = false;
-
-                    tx.commit();
-
-                    break;
-                }
-                catch (CacheException e) {
-                    if (e.getCause() instanceof ClusterGroupEmptyException)
-                        throw e;
-
-                    if (e.getCause() instanceof ClusterTopologyException) {
-                        if (cnt++ == MAX_UPDATE_RETRIES)
-                            throw e;
-                        else {
-                            U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
-                            U.sleep(RETRY_DELAY);
-                        }
-                    }
-                    else
-                        throw e;
-                }
-            }
-
-            return retVal;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected void removeItem(final long rmvIdx) throws IgniteCheckedException
{
-        try {
-            int cnt = 0;
-
-            while (true) {
-                try (IgniteTx tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx));
-
-                    if (idx != null) {
-                        checkRemoved(idx);
-
-                        boolean rmv = cache.remove(itemKey(idx));
-
-                        assert rmv;
-                    }
-
-                    tx.commit();
-
-                    break;
-                }
-                catch (CacheException e) {
-                    if (e.getCause() instanceof ClusterGroupEmptyException)
-                        throw e;
-
-                    if (e.getCause() instanceof ClusterTopologyException) {
-                        if (cnt++ == MAX_UPDATE_RETRIES)
-                            throw e;
-                        else {
-                            U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
-                            U.sleep(RETRY_DELAY);
-                        }
-                    }
-                    else
-                        throw e;
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c10a6c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index ae5fe77..094002d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -24,13 +24,13 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.cache.datastructures.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.task.*;


Mime
View raw message