ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [43/46] ignite git commit: IGNITE-3828: Added wrapper for DataStreamerImpl keys to minimize impact of hash code collisions. This closes #1034.
Date Mon, 12 Sep 2016 08:26:57 GMT
IGNITE-3828: Added wrapper for DataStreamerImpl keys to minimize impact of hash code collisions.
This closes #1034.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c3993d9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c3993d9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c3993d9

Branch: refs/heads/master
Commit: 6c3993d9d4b2126a4ef9699fdb3c0d296b03dea7
Parents: 65c92fa
Author: Andrey V. Mashenkov <andrey.mashenkov@gmail.com>
Authored: Fri Sep 9 13:09:40 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Sep 9 13:09:40 2016 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          | 69 +++++++++++++++-----
 1 file changed, 51 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6c3993d9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a3bae24..05e6488 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
 import javax.cache.expiry.ExpiryPolicy;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -139,7 +140,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     /** Cache name ({@code null} for default cache). */
     private final String cacheName;
 
-
     /** Per-node buffer size. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
@@ -512,7 +512,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
             activeFuts.add(resFut);
 
-            Collection<KeyCacheObject> keys =
+            Collection<KeyCacheObjectWrapper> keys =
                 new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()),
1);
 
             Collection<DataStreamerEntry> entries0 = new ArrayList<>(entries.size());
@@ -521,7 +521,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
                 KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(),
true);
                 CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, entry.getValue(),
true);
 
-                keys.add(key);
+                keys.add(new KeyCacheObjectWrapper(key));
 
                 entries0.add(new DataStreamerEntry(key, val));
             }
@@ -572,13 +572,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
             activeFuts.add(resFut);
 
-            Collection<KeyCacheObject> keys = null;
+            Collection<KeyCacheObjectWrapper> keys = null;
 
             if (entries.size() > 1) {
                 keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()),
1);
 
                 for (DataStreamerEntry entry : entries)
-                    keys.add(entry.getKey());
+                    keys.add(new KeyCacheObjectWrapper(entry.getKey()));
             }
 
             load0(entries, resFut, keys, 0);
@@ -641,7 +641,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     private void load0(
         Collection<? extends DataStreamerEntry> entries,
         final GridFutureAdapter<Object> resFut,
-        @Nullable final Collection<KeyCacheObject> activeKeys,
+        @Nullable final Collection<KeyCacheObjectWrapper> activeKeys,
         final int remaps
     ) {
         assert entries != null;
@@ -729,7 +729,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
                         if (activeKeys != null) {
                             for (DataStreamerEntry e : entriesForNode)
-                                activeKeys.remove(e.getKey());
+                                activeKeys.remove(new KeyCacheObjectWrapper(e.getKey()));
 
                             if (activeKeys.isEmpty())
                                 resFut.onDone();
@@ -1103,7 +1103,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
      * @throws org.apache.ignite.plugin.security.SecurityException If permissions are not
enough for streaming.
      */
     private void checkSecurityPermission(SecurityPermission perm)
-        throws org.apache.ignite.plugin.security.SecurityException{
+        throws org.apache.ignite.plugin.security.SecurityException {
         if (!ctx.security().enabled())
             return;
 
@@ -1172,8 +1172,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
          * @param newEntries Infos.
          * @param topVer Topology version.
          * @param lsnr Listener for the operation future.
-         * @throws IgniteInterruptedCheckedException If failed.
          * @return Future for operation.
+         * @throws IgniteInterruptedCheckedException If failed.
          */
         @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
             AffinityTopologyVersion topVer,
@@ -1221,7 +1221,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
         /**
          * @return Future if any submitted.
-         *
          * @throws IgniteInterruptedCheckedException If thread has been interrupted.
          */
         @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException
{
@@ -1273,13 +1272,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
         private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
             if (timeout == DFLT_UNLIMIT_TIMEOUT)
                 U.acquire(sem);
-            else
-                if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to add parallel operation.");
+            else if (!U.tryAcquire(sem, timeout, TimeUnit.MILLISECONDS)) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to add parallel operation.");
 
-                    throw new IgniteDataStreamerTimeoutException("Data streamer exceeded
timeout when starts parallel operation.");
-                }
+                throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout
when starts parallel operation.");
+            }
         }
 
         /**
@@ -1307,7 +1305,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
             try {
                 incrementActiveTasks();
-            } catch (IgniteDataStreamerTimeoutException e) {
+            }
+            catch (IgniteDataStreamerTimeoutException e) {
                 curFut.onDone(e);
                 throw e;
             }
@@ -1574,7 +1573,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
                 if (depCls != null)
                     cls0 = depCls;
                 else {
-                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0))
&& it.hasNext();) {
+                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0))
&& it.hasNext(); ) {
                         Object o = it.next();
 
                         if (o != null)
@@ -1696,4 +1695,38 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
             return PUBLIC_POOL;
         }
     }
+
+    /**
+     * Key object wrapper. Using identity equals prevents slow down in case of hash code
collision.
+     */
+    private static class KeyCacheObjectWrapper {
+        /** key object */
+        private final KeyCacheObject key;
+
+        /**
+         * Constructor
+         *
+         * @param key key object
+         */
+        KeyCacheObjectWrapper(KeyCacheObject key) {
+            assert key != null;
+
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return o instanceof KeyCacheObjectWrapper && this.key == ((KeyCacheObjectWrapper)o).key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(KeyCacheObjectWrapper.class, this);
+        }
+    }
 }


Mime
View raw message