ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [2/2] incubator-ignite git commit: # IGNITE-289 (Need to get rid of locPart reference in DHT cache entry) Store local partitions in array.
Date Thu, 07 May 2015 16:28:07 GMT
# IGNITE-289 (Need to get rid of locPart reference in DHT cache entry) Store local partitions
in array.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/12fa8894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/12fa8894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/12fa8894

Branch: refs/heads/ignite-289
Commit: 12fa88943e133ad54f7a70b654952482f2f267cf
Parents: 8b61e1c
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Thu May 7 19:27:50 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Thu May 7 19:27:50 2015 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopology.java           |   2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  35 +++--
 .../distributed/near/GridNearCacheAdapter.java  |  64 ++++-----
 .../internal/util/RarefiedConcurrentIntMap.java | 139 +++++++++++++++++++
 .../ignite/internal/util/lang/GridFunc.java     |  19 +++
 5 files changed, 214 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c551fb3..bad5efb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -121,7 +121,7 @@ public interface GridDhtPartitionTopology {
      *
      * @return All current local partitions.
      */
-    public Collection<GridDhtLocalPartition> currentLocalPartitions();
+    public Iterable<GridDhtLocalPartition> currentLocalPartitions();
 
     /**
      * @return Local IDs.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 073e0e7..e2f77de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -28,10 +28,8 @@ import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
-import org.jsr166.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.events.EventType.*;
@@ -55,8 +53,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
     private final IgniteLogger log;
 
     /** */
-    private final ConcurrentMap<Integer, GridDhtLocalPartition> locParts =
-        new ConcurrentHashMap8<>();
+    private final RarefiedConcurrentIntMap<GridDhtLocalPartition> locParts;
 
     /** Node to partition map. */
     private GridDhtPartitionFullMap node2part;
@@ -90,6 +87,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
 
         this.cctx = cctx;
 
+        locParts = new RarefiedConcurrentIntMap<>(cctx.affinity().partitions());
+
         log = cctx.logger(getClass());
     }
 
@@ -120,7 +119,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
         boolean changed = false;
 
         // Synchronously wait for all renting partitions to complete.
-        for (Iterator<GridDhtLocalPartition> it = locParts.values().iterator(); it.hasNext();)
{
+        for (Iterator<GridDhtLocalPartition> it = locParts.iterator(); it.hasNext();)
{
             GridDhtLocalPartition p = it.next();
 
             GridDhtPartitionState state = p.state();
@@ -557,12 +556,16 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
 
     /** {@inheritDoc} */
     @Override public List<GridDhtLocalPartition> localPartitions() {
-        return new LinkedList<>(locParts.values());
+        List<GridDhtLocalPartition> res = new LinkedList<>();
+
+        locParts.addAllTo(res);
+
+        return res;
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() {
-        return locParts.values();
+    @Override public Iterable<GridDhtLocalPartition> currentLocalPartitions() {
+        return locParts;
     }
 
     /** {@inheritDoc} */
@@ -603,8 +606,16 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
         lock.readLock().lock();
 
         try {
-            return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(),
-                F.viewReadOnly(locParts, CU.<K, V>part2state()), true);
+            GridDhtPartitionMap res = new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get());
+
+            for (int i = 0; i < locParts.maxIndex(); i++) {
+                GridDhtLocalPartition part = locParts.get(i);
+
+                if (part.state().active())
+                    res.put(i, part.state());
+            }
+
+            return res;
         }
         finally {
             lock.readLock().unlock();
@@ -950,7 +961,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
 
         UUID locId = cctx.nodeId();
 
-        for (GridDhtLocalPartition part : locParts.values()) {
+        for (GridDhtLocalPartition part : locParts) {
             GridDhtPartitionState state = part.state();
 
             if (state.active()) {
@@ -1172,7 +1183,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology
{
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName()
+ ", cache=" + cctx.name() + ']');
 
-        for (GridDhtLocalPartition part : locParts.values()) {
+        for (GridDhtLocalPartition part : locParts) {
             int size = part.size();
 
             if (size >= threshold)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 29c1d45..339649e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -329,42 +330,41 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         @Nullable final CacheEntryPredicate... filter) {
         final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
-        Collection<Cache.Entry<K, V>> entries =
-            F.flatCollections(
-                F.viewReadOnly(
-                    dht().topology().currentLocalPartitions(),
-                    new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>()
{
-                        @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition
p) {
-                            Collection<GridDhtCacheEntry> entries0 = p.entries();
-
-                            if (!F.isEmpty(filter))
-                                entries0 = F.view(entries0, new CacheEntryPredicateAdapter()
{
-                                    @Override public boolean apply(GridCacheEntryEx e) {
-                                        return F.isAll(e, filter);
-                                    }
-                                });
-
-                            return F.viewReadOnly(
-                                entries0,
-                                new C1<GridDhtCacheEntry, Cache.Entry<K, V>>()
{
-                                    @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry
e) {
-                                        return e.wrapLazyValue();
-                                    }
-                                },
-                                new P1<GridDhtCacheEntry>() {
-                                    @Override public boolean apply(GridDhtCacheEntry e) {
-                                        return !e.obsoleteOrDeleted();
-                                    }
-                                });
+        Iterable<GridDhtLocalPartition> primaryOnly = IgniteIterables.filter(dht().topology().currentLocalPartitions(),
+            new P1<GridDhtLocalPartition>() {
+            @Override public boolean apply(GridDhtLocalPartition p) {
+                return p.primary(topVer);
+            }
+        });
+
+        Iterable<Collection<Cache.Entry<K, V>>> entriesCol = IgniteIterables.transform(primaryOnly,
+            new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>()
{
+            @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition
p) {
+                Collection<GridDhtCacheEntry> entries0 = p.entries();
+
+                if (!F.isEmpty(filter))
+                    entries0 = F.view(entries0, new CacheEntryPredicateAdapter() {
+                        @Override public boolean apply(GridCacheEntryEx e) {
+                            return F.isAll(e, filter);
+                        }
+                    });
+
+                return F.viewReadOnly(
+                    entries0,
+                    new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() {
+                        @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry
e) {
+                            return e.wrapLazyValue();
                         }
                     },
-                    new P1<GridDhtLocalPartition>() {
-                        @Override public boolean apply(GridDhtLocalPartition p) {
-                            return p.primary(topVer);
+                    new P1<GridDhtCacheEntry>() {
+                        @Override public boolean apply(GridDhtCacheEntry e) {
+                            return !e.obsoleteOrDeleted();
                         }
-                    }));
+                    });
+            }
+        });
 
-        return new GridCacheEntrySet<>(ctx, entries, null);
+        return new GridCacheEntrySet<>(ctx, F.flatCollections(entriesCol), null);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java
new file mode 100644
index 0000000..0e7c0a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class RarefiedConcurrentIntMap<T> implements Iterable<T> {
+    /** */
+    private final AtomicReferenceArray<T> arr;
+
+    /** */
+    private final int maxIdx;
+
+    /**
+     * @param maxIdx Max element index.
+     */
+    public RarefiedConcurrentIntMap(int maxIdx) {
+        arr = new AtomicReferenceArray<T>(maxIdx);
+
+        this.maxIdx = maxIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterator<T> iterator() {
+        return new Iterator<T>() {
+
+            private int idx;
+
+            private T next;
+
+            private T lastReturned;
+
+            private int lastReturnedIdx;
+
+            private void advance() {
+                while (next == null && idx < maxIdx)
+                    next = arr.get(idx++);
+            }
+
+            @Override public boolean hasNext() {
+                advance();
+
+                return next != null;
+            }
+
+            @Override public T next() {
+                advance();
+
+                if (next == null)
+                    throw new NoSuchElementException();
+
+                lastReturned = next;
+                lastReturnedIdx = idx - 1;
+
+                next = null;
+
+                return lastReturned;
+            }
+
+            @Override public void remove() {
+                if (lastReturned == null)
+                    throw new IllegalStateException();
+
+                arr.compareAndSet(lastReturnedIdx, lastReturned, null);
+
+                lastReturned = null;
+            }
+        };
+    }
+
+    /**
+     * @param idx Index.
+     */
+    public T get(int idx) {
+        return arr.get(idx);
+    }
+
+    /**
+     *
+     */
+    public int maxIndex() {
+        return maxIdx;
+    }
+
+    /**
+     * @param idx Index.
+     * @param expVal Expected value.
+     */
+    public boolean remove(int idx, T expVal) {
+        return arr.compareAndSet(idx, expVal, null);
+    }
+
+    /**
+     * @param idx Index.
+     * @param val Value.
+     */
+    public T putIfAbsent(int idx, T val) {
+        while (true) {
+            if (arr.compareAndSet(idx, null, val))
+                return null;
+
+            T res = arr.get(idx);
+
+            if (res != null)
+                return res;
+        }
+    }
+
+    /**
+     * @param c Closure.
+     */
+    public void addAllTo(Collection<? super T> c) {
+        for (int i = 0; i < maxIdx; i++) {
+            T e = arr.get(i);
+
+            if (e != null)
+                c.add(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index c86c5a4..c6137b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -2884,6 +2884,25 @@ public class GridFunc {
         if (F.isEmpty(c))
             return Collections.emptyList();
 
+        return flatCollections((Iterable<? extends Collection<T>>)c);
+    }
+
+    /**
+     * Flattens collection-of-collections and returns collection over the
+     * elements of the inner collections. This method doesn't create any
+     * new collections or copies any elements.
+     * <p>
+     * Note that due to non-copying nature of implementation, the
+     * {@link Collection#size() size()} method of resulting collection will have to
+     * iterate over all elements to produce size. Method {@link Collection#isEmpty() isEmpty()},
+     * however, is constant time and is much more preferable to use instead
+     * of {@code 'size()'} method when checking if list is not empty.
+     *
+     * @param c Input collection of collections.
+     * @param <T> Type of the inner collections.
+     * @return Iterable over the elements of the inner collections.
+     */
+    public static <T> Collection<T> flatCollections(@Nullable final Iterable<?
extends Collection<T>> c) {
         return new GridSerializableCollection<T>() {
             @NotNull
             @Override public Iterator<T> iterator() {


Mime
View raw message